package hbasecli.connection;

import hbasecli.config.DefaultConnectionConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import static org.springframework.util.StringUtils.hasText;

@Component
public class HBaseConnections {

    private static final Logger log = LoggerFactory.getLogger(HBaseConnections.class);

    @Resource
    private DefaultConnectionConfig defaultConnectionConfig;

    @SuppressWarnings("unused")
    @PostConstruct
    private void init() {
        if (!"true".equalsIgnoreCase(defaultConnectionConfig.getImmediate())) {
            return;
        }
        Configuration configuration = new Configuration();
        Map<String, String> usefulParams = new HashMap<>();
        if (hasText(defaultConnectionConfig.getZookeeperPropertyClientPort())) {
            String name = "hbase.zookeeper.property.clientPort";
            String value = defaultConnectionConfig.getZookeeperPropertyClientPort();
            configuration.set(name, value);
            usefulParams.put(name, value);
        }
        if (hasText(defaultConnectionConfig.getMasterPort())) {
            String name = "hbase.master.port";
            String value = defaultConnectionConfig.getMasterPort();
            configuration.set(name, value);
            usefulParams.put(name, value);
        }
        if (hasText(defaultConnectionConfig.getZookeeperQuorum())) {
            String name = "hbase.zookeeper.quorum";
            String value = defaultConnectionConfig.getZookeeperQuorum();
            configuration.set(name, value);
            usefulParams.put(name, value);
        }
        if (hasText(defaultConnectionConfig.getRegionServerPort())) {
            String name = "hbase.regionserver.port";
            String value = defaultConnectionConfig.getRegionServerPort();
            configuration.set(name, value);
            usefulParams.put(name, value);
        }
        Connection connection;
        try {
            connection = ConnectionFactory.createConnection(configuration);
        } catch (IOException e) {
            log.error("连接时出现异常: ", e);
            log.error("创建连接失败: {}", usefulParams);
            return;
        }
        if (connection == null) {
            log.error("创建连接失败: {}", usefulParams);
            return;
        }
        String name = configuration.get("hbase.zookeeper.quorum");
        if (!hasText(name)) {
            name = "connection:" + System.currentTimeMillis();
        }
        log.info("{}", usefulParams);
        // TODO
        put(connection, usefulParams, name);
    }

    private final ConcurrentHashMap<String, ConnectionExt> map = new ConcurrentHashMap<>();

    public Connection get(String id) {
        ConnectionExt connectionExt = map.get(id);
        if (connectionExt == null) {
            return null;
        }
        return connectionExt.getConnection();
    }

    public ConnectionInfo getConnectionInfo(String id) {
        ConnectionExt connectionExt = map.get(id);
        if (connectionExt == null) {
            return null;
        }
        Connection connection = connectionExt.getConnection();
        if (connection == null) {
            return null;
        }
        ConnectionInfo info = new ConnectionInfo();
        info.setId(id);
        info.setName(connectionExt.getName());
        info.setParams(connectionExt.getParams());
        info.setCreateTime(connectionExt.getCreateTime());
        info.setRunning(!connection.isClosed() && !connection.isAborted());
        return info;
    }

    public ConnectionInfo remove(String id) {
        ConnectionExt connectionExt = map.remove(id);
        if (connectionExt == null) {
            return null;
        }
        Connection connection = connectionExt.getConnection();
        if (connection == null) {
            return null;
        }
        try {
            connection.close();
        } catch (IOException e) {
            log.error("HBase 连接关闭: ", e);
        }
        ConnectionInfo info = new ConnectionInfo();
        info.setId(id);
        info.setName(connectionExt.getName());
        info.setParams(connectionExt.getParams());
        info.setCreateTime(connectionExt.getCreateTime());
        info.setRunning(!connection.isClosed() && !connection.isAborted());
        return info;
    }

    public ConnectionInfo put(Connection connection, Map<String, String> params, String name) {
        ConnectionExt connectionExt = new ConnectionExt();
        long createTime = System.currentTimeMillis();
        String id = Long.toString(createTime);
        connectionExt.setId(id);
        connectionExt.setName(name);
        connectionExt.setParams(params);
        connectionExt.setConnection(connection);
        connectionExt.setCreateTime(Long.toString(createTime));
        map.put(id, connectionExt);
        ConnectionInfo info = new ConnectionInfo();
        info.setId(id);
        info.setName(name);
        info.setParams(params);
        info.setCreateTime(connectionExt.getCreateTime());
        info.setRunning(!connection.isClosed() && !connection.isAborted());
        return info;
    }

    public List<ConnectionInfo> list() {
        List<ConnectionInfo> connectionInfoList = new ArrayList<>();
        for (ConnectionExt c : map.values()) {
            ConnectionInfo connectionInfo = new ConnectionInfo();
            connectionInfo.setId(c.getId());
            connectionInfo.setName(c.getName());
            connectionInfo.setParams(c.getParams());
            connectionInfo.setCreateTime(c.getCreateTime());
            Connection connection = c.getConnection();
            if (connection == null) {
                connectionInfo.setRunning(false);
            } else {
                connectionInfo.setRunning(!connection.isClosed() && !connection.isAborted());
            }
            connectionInfoList.add(connectionInfo);
        }
        connectionInfoList.sort((c1, c2) -> c2.getCreateTime().compareTo(c1.getCreateTime()));
        return connectionInfoList;
    }

}